iT邦幫忙

2024 iThome 鐵人賽

DAY 10
0
Software Development

用 NestJS 闖蕩微服務!系列 第 10

[用NestJS闖蕩微服務!] DAY10 - gRPC Transporter (下)

  • 分享至 

  • xImage
  •  

gRPC Transporter

NestJS 實作了 gRPC Transporter,讓微服務應用程式可以用跟其他 Transporter 相似的開發風格來使用 gRPC。

前置作業

要使用 gRPC Transporter 之前需要先安裝下方套件:

$ npm install @grpc/grpc-js @grpc/proto-loader

補充@grpc/grpc-js 是一套用於 Node.js 的 gRPC Client,它本身並不處理讀取 Protocol Buffer 的工作,而是交由 @grpc/proto-loader 來處理,這兩者搭配使用即可在 Node.js 實現 gRPC 通訊,有興趣可以參考官方文件

在上一篇有提到可以透過 Protocol Buffer Compiler 將 Protocol Buffer 產生特定語言的程式碼,以 NestJS 來說,較簡單、整合較好的非 ts-proto 莫屬,它不僅可以將 Protocol Buffer 轉成 TypeScript,還能夠指定轉換成 NestJS 的相關程式碼,非常強大。透過下方指令進行安裝:

$ npm install ts-proto -D

編譯 Protocol Buffer

首先,我們定義兩個 Protocol Buffer 供後續範例使用,分別是 todo.prototodo_service.proto,下方是 todo.proto 的內容,定義了 Todo 並將其封裝於 todo.definition 中:

syntax = "proto3";

package todo.definition;

message Todo {
  string id = 1;
  string title = 2;
  bool completed = 3;
  optional string description = 4; 
}

接著,在 todo_service.proto 使用 todo.proto,並定義 TodoService 與其相關 Message:

syntax = "proto3";

import "todo.proto";

package todo.service;

message GetTodoRequest {
  string id = 1;
}

message GetTodoResponse {
  todo.definition.Todo todo = 1;
}

message GetTodosRequest {
  repeated string ids = 1;
}

message GetTodosResponse {
  repeated todo.definition.Todo todos = 1;
}

message CreateTodoRequest {
  string title = 1;
  optional string description = 2;
}

message CreateTodoResponse {
  todo.definition.Todo todo = 1;
}

message CompleteTodoRequest {
  string id = 1;
}

message CompleteTodoResponse {
  todo.definition.Todo todo = 1;
}

service TodoService {
  rpc GetTodo(GetTodoRequest) returns (GetTodoResponse);
  rpc GetTodos(stream GetTodoRequest) returns (stream GetTodoResponse);
  rpc GetTodosByIds(GetTodosRequest) returns (stream GetTodoResponse);
  rpc GetTodosThroughStream(stream GetTodoRequest) returns (GetTodosResponse);
  rpc CreateTodo(CreateTodoRequest) returns (CreateTodoResponse);
  rpc CompleteTodo(CompleteTodoRequest) returns (CompleteTodoResponse);
}

定義完之後,要透過 ts-proto 進行編譯,下方是編譯的指令:

$ protoc --plugin=./node_modules/.bin/proto-gen-ts_proto --ts_proto_out=<DST_DIR> --proto_path=<IMPORT_PATH> ./todo_service.proto --ts_proto_opt=nestJs=true,addGrpcMetadata=true,addNestjsRestParameter=true

從指令中可以看到有很多個參數,它們的功能如下:

  • --plugin:指定使用的插件,這邊指定了 ts-proto,位於 ./node_modules/.bin/proto-gen-ts_proto
  • --ts_proto_out:指定產生出來的 TypeScript 檔案位置。
  • --ts_proto_opt:設定 ts-proto 的選項配置,這裡指定了 nestJs=true 來產生 NestJS 所需的程式碼,並設定 addGrpcMetadata=true 來產生 Metadata 相關的程式碼,還可以看到設定了 addNestjsRestParameter=true,它可以避免產生出來的介面過於嚴謹、缺乏彈性,導致無法正確實作 Handler。

產生出來的檔案會有兩個,分別是 todo_service.tstodo.ts,會發現產生的檔案名稱會與 Protocol Buffer 的檔案名稱相同。

ts-proto generated files

建立微服務應用程式

修改載入點 main.ts 的內容,將 transport 設定為 Transport.GRPC,並設定 packagetodo_service.ts 中的 TODO_SERVICE_PACKAGE_NAME,同時指定 protoPathtodo_service.proto 的路徑:

import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { join } from 'path';
import { AppModule } from './app.module';
import { TODO_SERVICE_PACKAGE_NAME } from './todo/todo_service';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.GRPC,
      options: {
        package: TODO_SERVICE_PACKAGE_NAME,
        protoPath: join(__dirname, 'todo/todo_service.proto'),
      },
    },
  );
  await app.listen();
}
bootstrap();

上方範例在 options 只使用了 packageprotoPath,事實上,gRPC Transporter 的 options 有以下幾個屬性可以設定:

  • package:為必填項目,需填入 .proto 中的 package 名稱,若以 ts-proto 進行編譯且指定輸出成 NestJS 程式碼的話,會有相關的常數可以使用,常數名稱格式為 <FILE_NAME>_PACKAGE_NAME
  • protoPath:為必填項目,需填入 .proto 的路徑。
  • url:要建立連線的位址,格式為 <IP>:<PORT>,預設為 localhost:5000
  • protoLoader:讀取、解析 Protocol Buffer 的套件名稱,預設為 @grpc/proto-loader
  • loader@grpc/proto-loader 的相關設定,詳細內容可以參考官方文件
  • credentials:憑證相關設定。

由於我們需要讀取 Protocol Buffer,但它並不是 TypeScript 檔案,對 NestJS 而言並不會主動將該檔案進行編譯、搬移,所以如果直接執行會無法正確讀取 .proto,我們需要調整 nest-cli.json 的內容,在 compilerOptions 裡面設置 assets,將 .proto 視為資產,在編譯 NestJS 應用程式時,會一同搬遷至 dist 資料夾中,這裡為了開發方便,再額外將 watchAssets 設為 true,讓 NestJS 可以持續觀察這些非 TypeScript 檔案的變化,省去重啟應用程式的時間:

{
  "compilerOptions": {
    "assets": ["**/*.proto"],
    "watchAssets": true
  }
}

Transporter 訊息模式

gRPC Transporter 實作訊息模式的方式與其他 Transporter 皆不同,並 不會 使用 @MessagePattern@EventPattern 裝飾器來實作 Handler,而是使用 gRPC 專屬的裝飾器,接下來會分別介紹 4 種不同的 RPC 設計該如何運用這些裝飾器進行實作。

Unary RPC

Unary RPC 是最簡單的 RPC 類型,以前面定義的 TodoService 來說,GetTodoCreateTodoCompleteTodo 都屬於這種類型。修改 AppController 的內容,分別設計 getTodocreateTodocompleteTodo 三個 Handler,並套上 @GrpcMethod 裝飾器,並在裝飾器帶入 todo_service.ts 裡面提供的 TODO_SERVICE_NAME

import { Controller } from '@nestjs/common';
import { GrpcMethod, RpcException } from '@nestjs/microservices';
import { Metadata, status as GrpcStatus, ServerUnaryCall } from '@grpc/grpc-js';
import { defer } from 'rxjs';
import {
  TODO_SERVICE_NAME,
  GetTodoRequest,
  GetTodoResponse,
  CreateTodoRequest,
  CreateTodoResponse,
  CompleteTodoRequest,
  CompleteTodoResponse
} from './todo/todo_service';
import { Todo } from './todo/todo';

@Controller()
export class AppController {
  private todos: Array<Todo> = [];

  @GrpcMethod(TODO_SERVICE_NAME, 'GetTodo')
  getTodo(
    data: GetTodoRequest,
    metadata: Metadata,
    call: ServerUnaryCall<GetTodoRequest, GetTodoResponse>
  ): GetTodoResponse {
    const todo = this.todos.find((todo) => todo.id === data.id);

    if (!todo) {
      throw new RpcException({
        code: GrpcStatus.NOT_FOUND,
      });
    }

    return { todo };
  }

  @GrpcMethod(TODO_SERVICE_NAME)
  createTodo(data: CreateTodoRequest, metadata: Metadata): Observable<CreateTodoResponse> {
    const todo: Todo = {
      id: Math.random().toString(),
      title: data.title,
      description: data.description,
      completed: false,
    };
    return defer(() => {
      this.todos.push(todo);
      return Promise.resolve({ todo });
    });
  }

  @GrpcMethod(TODO_SERVICE_NAME)
  completeTodo(data: CompleteTodoRequest, metadata: Metadata): CompleteTodoResponse {
    const todo = this.todos.find((todo) => todo.id === data.id);

    if (!todo) {
      throw new RpcException({
        code: GrpcStatus.NOT_FOUND,
      });
    }

    todo.completed = true;

    return { todo };
  }
}

從上方的範例程式碼可以看到 getTodo 套用的 @GrpcMethod 帶了兩個參數,第一個是 TODO_SERVICE_NAME,即 TodoService,第二個參數為對應的函式名稱,即 GetTodo,這樣 NestJS 就知道這個 Handler 是哪個 Service 的哪個函式的實作,但可以從 createTodocompleteTodo 看出,第二個參數並 不是必填,NestJS 會自動判斷 Handler 的名稱是否為 TodoService 定義的函式,不過 NestJS 為了保持 Handler 命名的一致性,並不是以 PascalCase 來命名 Handler,而是以 小寫駝峰 的方式。

而一個 gRPC Transporter 的 Handler 會有三個參數,分別是請求的資料、Metadata 與 GrpcCall 物件,其中以請求的資料最為重要,其他如果不使用是可以不寫的。

透過 Postman 進行測試,建立 gRPC 請求,選擇「Service definition」頁籤並將 todo_service.proto 匯入:

Import gRPC Service Definition

匯入完畢後,將上方 URL 輸入 微服務應用程式架設 gRPC Server 的位址,旁邊的選擇器選擇 「TodoService/CreateTodo」,並在「Message」帶入測試資料:

{
  "title": "Test1"
}

畫面如下所示:

gRPC Transporter Test Unary RPC Ready1

此時按下「Invoke」會看到來自微服務應用程式的回應:

gRPC Transporter Test Unary RPC Result1

再建立一個「TodoService/GetTodo」進行測試,並在「Message」帶入剛剛建立的那筆資料的 id

gRPC Transporter Test Unary RPC Ready2

此時按下「Invoke」會看到來自微服務應用程式的回應:

gRPC Transporter Test Unary RPC Result2

如果帶入一組不存在的 id 則會順利收到 Status Code 為 5 NOT_FOUND 的錯誤,原因是我們在程式碼中有針對找不到的情況拋出 RpcException,並帶入 codeNOT_FOUND

gRPC Transporter Test Unary RPC Result3

最後,建立一個「TodoService/CompleteTodo」進行測試,在「Message」帶入剛剛建立的那筆資料的 id

gRPC Transporter Test Unary RPC Ready3

此時按下「Invoke」會看到來自微服務應用程式的回應:

gRPC Transporter Test Unary Result4

Server Streaming RPC

Server Streaming RPC 在實現上與 Unary RPC 相似,只要將回傳值設為 Observable 即可,該 Observable 進入 complete 狀態即表示串流結束。前面定義 TodoService 裡面的 GetTodosByIds 即屬於這個類型,現在修改 AppController 的內容,設計 getTodosByIds 方法並套用 @GrpcMethod 裝飾器:

import { Controller } from '@nestjs/common';
import {
  // ...
  GrpcMethod
} from '@nestjs/microservices';
import {
  // ...
  from
} from 'rxjs';
import {
  TODO_SERVICE_NAME,
  // ...
  GetTodoResponse,
  GetTodosRequest,
} from './todo/todo_service';
import { Todo } from './todo/todo';
// ...

@Controller()
export class AppController {
  private todos: Array<Todo> = [];
  // ...
  @GrpcMethod(TODO_SERVICE_NAME)
  getTodosByIds(data: GetTodosRequest): Observable<GetTodoResponse> {
    const todos = this.todos.filter((todo) => data.ids.includes(todo.id));
    return from(todos).pipe(map((todo) => ({ todo })));
  }
}

上方範例從 todos 中找出 id 相符的 Todo,接著,運用 RxJS 的 from 將陣列內的元素一個一個發出,當陣列中每個元素都發出後,即進入 complete 狀態,實現串流的效果。

透過 Postman 進行測試,先透過前面實作的 CreateTodo 建立多筆資料,接著,建立一個「TodoService/GetTodosByIds」進行測試,並在「Message」帶入剛剛建立的那些資料的 id

gRPC Transporter Test Server Streaming RPC Ready1

此時按下「Invoke」會看到來自微服務應用程式的多筆回應:

gRPC Transporter Test Server Streaming RPC Result1

Client Streaming RPC

Client Streaming RPC 在實作上也會使用 RxJS,因為它對於串流的表現相當出色,可以用更簡潔的方式來達成。前面定義 TodoService 裡面的 GetTodosThroughStream 即屬於這個類型,現在修改 AppController 的內容,設計 getTodosThroughStream 方法並套上 @GrpcStreamMethod 裝飾器:

import { Controller } from '@nestjs/common';
import {
  // ...
  GrpcStreamMethod
} from '@nestjs/microservices';
import {
  // ...
  map,
  filter,
  toArray
} from 'rxjs';
import {
  TODO_SERVICE_NAME,
  // ...
  GetTodosResponse,
  GetTodoRequest,
} from './todo/todo_service';
import { Todo } from './todo/todo';
// ...

const isDefined = (value: Todo | undefined): value is Todo => !!value;

@Controller()
export class AppController {
  private todos: Array<Todo> = [];
  // ...
  @GrpcStreamMethod(TODO_SERVICE_NAME)
  getTodosThroughStream(request$: Observable<GetTodoRequest>): Observable<GetTodosResponse> {
    const todos$ = request$.pipe(
      map(({ id }) => this.todos.find((todo) => todo.id === id)),
      filter(isDefined),
      toArray(),
    );
    return todos$.pipe(map((todos) => ({ todos })));
  }
}

上方範例可以看到 Handler 的參數會是一個 Observable,它會持續傳入由 Client 發送的資料,所以我們可以運用 map 針對每一筆傳進來的 idtodos 找出對應的資料,如果沒找到就透過 filter 濾掉,最後 Client 端結束串流時,request$ 就會進入 complete 狀態,此時 toArray 就會把先前過濾出來的結果匯集成一個 Todo 陣列,最後再透過 map 轉換成 GetTodosResponse 的格式。

補充:事實上,NestJS 針對 Stream 有兩種處理方式,分別是 RxJSCall Stream,但因為 Call Stream 的處理方式並 沒有 RxJS 處理來得方便,可讀性也沒有比較高,所以在本篇系列文不會特別說明,如果真的很有興趣,可以參考官方文件的說明。

透過 Postman 進行測試,先透過前面實作的 CreateTodo 建立多筆資料,接著,建立一個「TodoService/GetTodosThroughStream」進行測試,並在「Message」帶入剛剛建立的其中一筆資料的 id

gRPC Transporter Test Client Streaming RPC Ready1

此時按下「Invoke」並按下方「Send」按鈕,會在「Responses」看見發送出去的資料:

gRPC Transporter Test Client Streaming RPC Ready2

接著,再把一筆資料的 id 帶入「Message」並再次按下「Send」:

gRPC Transporter Test Client Streaming RPC Ready3

最後,點擊「End Streaming」結束串流,此時會看到微服務應用程式的回應:

gRPC Transporter Test Client Streaming RPC Result1

Bidirectional Streaming RPC

Bidirectional Streaming RPC 也是以 RxJS 來處理,實作方式也與 Client Streaming RPC 相似。前面定義 TodoService 裡面的 GetTodos 即屬於這個類型,現在修改 AppController 的內容,設計 getTodos 方法並套上 @GrpcStreamMethod 裝飾器:

import { Controller } from '@nestjs/common';
import {
  // ...
  GrpcStreamMethod
} from '@nestjs/microservices';
import {
  // ...
  map,
  filter
} from 'rxjs';
import {
  TODO_SERVICE_NAME,
  // ...
  GetTodoResponse,
  GetTodoRequest,
} from './todo/todo_service';
import { Todo } from './todo/todo';
// ...

const isDefined = (value: Todo | undefined): value is Todo => !!value;

@Controller()
export class AppController {
  private todos: Array<Todo> = [];
  // ...
  @GrpcStreamMethod(TODO_SERVICE_NAME)
  getTodos(request$: Observable<GetTodoRequest>): Observable<GetTodoResponse> {
    const todo$ = request$.pipe(
      map(({ id }) => this.todos.find((todo) => todo.id === id)),
      filter(isDefined),
    );
    return todo$.pipe(map((todo) => ({ todo })));
  }
}

上方範例跟 getTodosThroughStream 最大的差異在於 沒有 使用 toArray 等待 request$ 進入 complete 狀態進行匯集,原因是 Bidirectional Streaming RPC 是雙向 Stream 的設計,這邊的設計是只要找到一筆資料就會立即回覆給 Client。

透過 Postman 進行測試,先透過前面實作的 CreateTodo 建立多筆資料,接著,建立一個「TodoService/GetTodos」進行測試,並在「Message」帶入剛剛建立的其中一筆資料的 id

gRPC Transporter Test Bidirectional Streaming RPC Ready1

此時按下「Invoke」並按下方「Send」按鈕,會在「Responses」看見發送出去的資料與回應:

gRPC Transporter Test Bidirectional Streaming RPC Result1

接著,再把一筆資料的 id 帶入「Message」並再次按下「Send」:

gRPC Transporter Test Client Streaming RPC Result2

最後,點擊「End Streaming」結束串流:

gRPC Transporter Test Client Streaming RPC Result3

Metadata

對微服務應用程式來說,可以透過 Handler 的第二個參數取得來自 Client 的 Metadata,那麼微服務應用程式本身要如何傳遞 Metadata 給 Client 呢?這時候就需要使用到 Handler 的第三個參數 ServerUnaryCall,透過其 sendMetadata 方法即可將 Metadata 發送給 Client。下方是範例程式碼,修改 AppController 的內容,讓 createTodo 發送 x-version1 的 Metadata:

import { Controller } from '@nestjs/common';
import {
  GrpcMethod,
  // ...
  } from '@nestjs/microservices';
import {
  Metadata,
  ServerUnaryCall,
  // ...
  } from '@grpc/grpc-js';
import { defer } from 'rxjs';
import {
  TODO_SERVICE_NAME,
  // ...
  CreateTodoRequest,
  CreateTodoResponse,
} from './todo/todo_service';
import { Todo } from './todo/todo';

@Controller()
export class AppController {
  private todos: Array<Todo> = [];
  // ...

  @GrpcMethod(TODO_SERVICE_NAME)
  createTodo(
    data: CreateTodoRequest,
    metadata: Metadata,
    call: ServerUnaryCall<CreateTodoRequest, CreateTodoResponse>
  ): Observable<CreateTodoResponse> {
    const todo: Todo = {
      id: Math.random().toString(),
      title: data.title,
      description: data.description,
      completed: false,
    };

    return defer(() => {
      this.todos.push(todo);

      const serverMetadata = new Metadata();
      serverMetadata.add('x-version', '1');
      call.sendMetadata(serverMetadata);

      return Promise.resolve({ todo });
    });
  }
}

透過 Postman 進行測試,使用前面建立的「TodoService/CreateTodo」並點擊「Invoke」,會在「Metadata」看到 x-version1 的訊息:

gRPC Transporter Test Metadata Result1

自動關聯服務定義

前面提到 @GrpcMethod@GrpcStreamMethod 要在第一個參數帶入服務名稱,但如果這個 Controller 本身就只會對應一個服務,那每個 Handler 都要帶入服務名稱就顯得有些多餘,所以 NestJS 有支援自動關聯服務定義的功能,只要 Controller 名稱與定義的服務名稱相同 就會自動進行推斷。下方是範例程式碼:

// ...
@Controller()
export class TodoService {
  private todos: Array<Todo> = [];

  @GrpcMethod()
  getTodo(data: GetTodoRequest, metadata: Metadata): GetTodoResponse {
    // ...
  }

  @GrpcMethod()
  createTodo(data: CreateTodoRequest, metadata: Metadata): Observable<CreateTodoResponse> {
    // ...
  }

  @GrpcMethod()
  completeTodo(data: CompleteTodoRequest) {
    // ...
  }

  @GrpcMethod()
  getTodosByIds(data: GetTodosRequest): Observable<GetTodoResponse> {
    // ...
  }

  @GrpcStreamMethod()
  getTodos(request: Observable<GetTodoRequest>): Observable<GetTodoResponse> {
    // ...
  }

  @GrpcStreamMethod()
  getTodosThroughStream(request: Observable<GetTodoRequest>): Observable<GetTodosResponse> {
    // ...
  }
}

補充:通常我們在 Protocol Buffer 會以 XXXService 來定義服務名稱,但在 NestJS 的世界裡,經常會定義 Service 這個角色來處理某些商業邏輯,如果要使用自動關聯服務定義的話,勢必就會遇到 Controller 以 Service 結尾來命名的問題,導致難以識別 Service 是哪種 Service,所以在實務上建議與團隊成員們達成共識並仔細評估後再決定要不要使用。

gRPC Controller 裝飾器

除了自動關聯服務定義外,透過 ts-proto 產生的程式碼中,有提供一個裝飾器可以讓我們 省去所有 Handler 加上裝飾器的步驟,並且 不需要 調整 Controller 名稱,只要 Handler 依照 小寫駝峰 的規則來命名即可,而這個神奇的裝飾器會以 <SERVICE_NAME>ControllerMethods 命名。下方是範例程式碼,在 AppController 上添加 @TodoServiceControllerMethods 裝飾器,並將所有 Handler 上放的裝飾器移除,這邊會建議讓 AppController 實作 ts-proto 產生的 <SERVICE_NAME>Controller 介面,避免有忘記實作 Handler 的情況,以這個範例來說,要實作 TodoServiceController

// ...
import {
  // ...
  TodoServiceController,
  TodoServiceControllerMethods
} from './todo/todo_service';

@TodoServiceControllerMethods()
@Controller()
export class AppController implements TodoServiceController {
  private todos: Array<Todo> = [];

  getTodo(data: GetTodoRequest, metadata: Metadata) {
    // ...
  }

  createTodo(data: CreateTodoRequest, metadata: Metadata) {
    // ...
  }

  completeTodo(data: CompleteTodoRequest) {
    // ...
  }

  getTodosByIds(data: GetTodosRequest) {
    // ...
  }

  getTodos(request: Observable<GetTodoRequest>) {
    // ...
  }

  getTodosThroughStream(request: Observable<GetTodoRequest>) {
    // ...
  }
}

建立客戶端

gRPC Transporter 提供的 Client 與其他 Transporter 不同,它不使用 ClientProxy,取而代之的是 ClientGrpc,使用方式也與其他 Transporter 有很大的差異。

注意:由於 Client 端也需要使用 .proto,故需要調整 nest-cli.json 的內容,並透過 Protocol Buffer Compiler 產生相關程式碼。

修改 AppModule 的內容,透過 ClientsModule 建立 gRPC Transporter 的 ClientGrpc,其中,namepackage 都指定為 TODO_SERVICE_PACKAGE_NAMEurl 則要帶入 gRPC Server 的位址,以我們的情境來說就是 localhost:5000

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { join } from 'path';
import { TODO_SERVICE_PACKAGE_NAME } from './todo/todo_service';
// ...

@Module({
  // ...
  imports: [
    ClientsModule.register([
      {
        name: TODO_SERVICE_PACKAGE_NAME,
        transport: Transport.GRPC,
        options: {
          package: TODO_SERVICE_PACKAGE_NAME,
          protoPath: join(__dirname, 'todo/todo_service.proto'),
          url: 'localhost:5000',
        }
      }
    ])
  ]
})
export class AppModule {}

傳送訊息

在開始傳送訊息之前,我們需要先取得 todo.service 這個 package 底下的 TodoService Client,讓我們可以透過它來執行 RPC Call。修改 AppController 的內容,透過 @Inject 裝飾器注入 ClientGrpc,並實作 OnModuleInit 介面,在 onModuleInit 階段透過 ClientGrpcgetService 方法取得 TodoService 的 Client,這裡我們可以使用 ts-proto 編譯出來的 TodoServiceClient 介面來作為該 Client 的型別:

import { Controller, OnModuleInit } from '@nestjs/common';
import { ClientGrpc } from '@nestjs/microservices';
import {
  TodoServiceClient,
  TODO_SERVICE_PACKAGE_NAME,
  TODO_SERVICE_NAME
} from './todo/todo_service';

@Controller('todos')
export class AppController implements OnModuleInit {
  private todoServiceClient: TodoServiceClient;

  constructor(
    @Inject(TODO_SERVICE_PACKAGE_NAME)
    private readonly client: ClientGrpc
  ) {}

  onModuleInit() {
    this.todoServiceClient = this.client.getService(TODO_SERVICE_NAME);
  }
}

接著,實作兩個 API 來使用 TodoServiceClient,修改 AppController 的內容,設計 createTodogetTodos 方法,分別使用 TodoServiceClientcreateTodogetTodosThroughStream 來示範如何呼叫非串流與串流的 RPC Call:

import {
  // ...
  Post,
  Query,
  ParseArrayPipe
} from '@nestjs/common';
import { Metadata } from '@grpc/grpc-js';
import { from } from 'rxjs';
import {
  // ...
  GetTodoRequest
} from './todo/todo_service';
// ...

@Controller('todos')
export class AppController implements OnModuleInit {
  private todoServiceClient: TodoServiceClient;

  // ...

  @Post()
  createTodo(@Body() todo: CreateTodoRequest) {
    return this.todoServiceClient.createTodo(todo, new Metadata());
  }

  @Get()
  getTodos(@Query('ids', ParseArrayPipe) ids: Array<string>) {
    const payloads = ids.map<GetTodoRequest>((id) => ({ id }));
    const request$ = from(payloads);
    return this.todoServiceClient.getTodosThroughStream(request$, new Metadata());
  }
}

從上方範例可以看出,對 Client 端而言就是呼叫函式,完全不需要在意底層的通訊實作。createTodo 因為是 Unary RPC,所以在呼叫上就非常單純,而 getTodosThroughStream 因為是 Client Streaming RPC,所以它的參數會帶入一個 Observable,每當有值發出時,就會持續向 gRPC Server 發送,直到該 Observable 進入 complete 狀態才會關閉串流。

注意:上方範例有使用到 ParseArrayPipe,它是自動將 Query String 以某種方式拆分成陣列的 Pipe,但它會需要安裝 class-validatorclass-transformer,如果在執行時發生相關錯誤,只需要透過 npm 進行安裝即可。

透過 Postman 使用 POST 方法存取 http://localhost:3000/todos,並在 Body 帶入下方資料:

{
  "title": "Test",
  "description": "Test"
}

此時會看到下方的結果:

NestJS gRPC Client Result1

接著,把剛剛建立的那筆 id 記下來,透過 Postman 使用 GET 方法存取 http://localhost:3000/todos?ids=<TODO_ID>,會看到下方結果:

NestJS gRPC Client Result2

小結

回顧一下本篇的重點內容,一開始先將會用到的套件裝起來,其中,ts-proto 可以編譯 Protocol Buffer 產生 NestJS 相關程式碼,包含:Package 的名稱、服務的名稱、Message 的介面等,可以大幅減少手工定義的時間,甚至還產生了 Controller 用的裝飾器來包裝 gRPC Handler,是一個非常值得推薦的第三方套件。

gRPC 不使用 @MessagePattern@EventPattern 裝飾器,而是針對 RPC 類型使用專屬的裝飾器,像是:用 @GrpcMethod 來處理 Unary RPC 與 Server Streaming RPC 的情況、用 @GrpcStreamMethod 來處理 Client Streaming RPC 與 Bidirectional Streaming RPC 的情況。

gRPC Client 使用的是 ClientGrpc 而不是 ClientProxy,使用方法也跟 ClientProxy 不同,我們會透過 ClientGrpc 取得對應服務的 Client,並透過該 Client 實例來進行 RPC Call。

NestJS 內建的 Transporter 到這邊告一段落了,如果有 NestJS 沒有內建、內建 Transporter 不夠好用或是沒有相關第三方套件該怎麼辦呢?下一篇會介紹 Custom Transporter,讓開發者可以自行定義 Transporter,敬請期待!


上一篇
[用NestJS闖蕩微服務!] DAY09 - gRPC Transporter (上)
下一篇
[用NestJS闖蕩微服務!] DAY11 - Custom Transporter (上)
系列文
用 NestJS 闖蕩微服務!21
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言